Introduction

The Events Module in xpander.ai SDK provides functionality for handling background tasks and event streaming. It integrates seamlessly with the platform for real-time task execution and monitoring.

Overview

The SDK supports comprehensive event handling systems:
  • Agent Event Listeners
    • Set up using the @on_task decorators
    • Handle incoming task execution requests
    • Integrate with Server Sent Events (SSE) for real-time communication
    • Includes strict validation to ensure the task parameter is correctly specified and returned
  • Task Event Streaming
    • Monitor updates in real-time during task execution
    • Track task progress, tool calls, and lifecycle events

Strict Validation

The @on_task decorator includes comprehensive validation to ensure proper usage and prevent runtime errors:

Parameter Validation

  • Required task Parameter: The decorated function must have a parameter named task
  • Task Type Validation: The task parameter must be an instance of the Task class
  • Return Type Validation: The function must return an instance of the Task class
  • Test Task Validation: If provided, test_task must be an instance of LocalTaskTest

Validation Examples

Valid Function Signature

from xpander_sdk import on_task
from xpander_sdk.modules.tasks.sub_modules.task import Task

@on_task
def valid_handler(task: Task) -> Task:
    # Process the task
    task.result = "Processed successfully"
    return task

Invalid Function Signatures (Will Raise Errors)

# Missing 'task' parameter - TypeError will be raised
@on_task
def invalid_no_task():
    pass

# Wrong parameter name - TypeError will be raised
@on_task
def invalid_wrong_param(my_task):
    pass

# Not returning Task instance - TypeError will be raised at runtime
@on_task
def invalid_return_type(task):
    return "string"  # Should return Task instance

Examples

Agent Event Listeners

Handle incoming task execution requests for agent deployment.

Asynchronous Example

Using @on_task decorator to create an event handler:
from xpander_sdk import on_task

@on_task
async def handle_task(task):
    print(f"Processing task: {task.id}")
    # Custom task logic
    task.result = "Task processed successfully"
    return task

Synchronous Example

from xpander_sdk import on_task

@on_task
def handle_task(task):
    print(f"Processing task: {task.id}")
    # Custom task logic
    task.result = "Task processed successfully"
    return task

Advanced Decorator Usage

The @on_task decorator supports additional parameters for advanced use cases:

Using Custom Configuration

from xpander_sdk import on_task, Configuration

# Create custom configuration
custom_config = Configuration(
    api_key="your-api-key",
    base_url="https://custom.api.url"
)

@on_task(configuration=custom_config)
async def handle_task_with_config(task):
    # Task will use the custom configuration
    print(f"Processing task with custom config: {task.id}")
    task.result = "Processed with custom configuration"
    return task

Using Test Tasks for Local Development

from xpander_sdk import on_task
from xpander_sdk.modules.tasks.models.task import LocalTaskTest, AgentExecutionInput
from xpander_sdk.models.shared import OutputFormat

# Define a local test task for development
test_task = LocalTaskTest(
    input=AgentExecutionInput(text="What can you do?"),
    output_format=OutputFormat.Json,
    output_schema={"capabilities": "list of capabilities"}
)

@on_task(test_task=test_task)
async def handle_test_task(task):
    print(f"Testing with local task: {task.input.text}")
    task.result = {
        "capabilities": [
            "Data analysis",
            "Text processing",
            "API integration"
        ]
    }
    return task

Task Event Streaming

Monitor task execution in real-time.

Asynchronous Example

from xpander_sdk import Tasks, Agent
from xpander_sdk.models.events import TaskUpdateEventType

# Load agent
agent = Agent.load("agent-id")

# Create task with event streaming enabled
task = await agent.acreate_task(
    prompt="Analyze this dataset",
    file_urls=["https://example.com/data.csv"],
    events_streaming=True
)

# Stream events
async for event in task.aevents():
    print(f"Event Type: {event.type}")
    if event.type == TaskUpdateEventType.TaskFinished:
        break

Synchronous Example

# Monitor task events
for event in task.events():
    print(f"Event: {event.type}")
    if event.type == "task_finished":
        break

Continue to the [Events API Reference](/API reference/events/API reference) for detailed class and method documentation.

Support

For additional help:

Contact

For further assistance, contact support: dev@xpander.ai